package kafka

import (
	"github.com/Shopify/sarama"
)

var kaProxy = map[int]*appKafka{
	ScrmPurpose: nil,
	CdpPurpose:  nil,
}

// Config 配置参数
type KaConfig struct {
	Addr      []string `json:"addr"`
	Topic     string   `json:"topic"`
	Partition int32    `json:"partition"`
	Purpose   int      `json:"purpose"`
	//验证用户名密码
	SaslEnable bool   `json:"saslenable"`
	User       string `json:"user"`
	Password   string `json:"password"`
}

const ScrmPurpose = 0 //scrm
const CdpPurpose = 1  //cdp

func Init(cof *KaConfig) (*appKafka, error) {
	config := sarama.NewConfig()
	config.Net.SASL.Enable = cof.SaslEnable
	config.Net.SASL.User = cof.User
	config.Net.SASL.Password = cof.Password
	config.Producer.Return.Successes = true
	config.Version = sarama.V1_1_1_0
	producer, err := sarama.NewSyncProducer(cof.Addr, config)
	if err != nil {
		return nil, err
	}

	return NewAppKafka(cof.Topic, producer), nil
}

// InitAppKafka 初始化kafka
func InitAppKafka(cfg *KaConfig) error {
	var err error
	kaProxy[cfg.Purpose], err = Init(cfg)
	if err != nil {
		return err
	}
	return nil
}

func GetAppKafka(purpose int) *appKafka {
	if _, ok := kaProxy[purpose]; !ok || kaProxy[purpose] == nil {
		panic("kafka 模块没有初始化")
	}
	return kaProxy[purpose]
}
